/*
 * Copyright (c) 2020 - present, Inspur Genersoft Co., Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.iec.edp.caf.msu.client.health;

import io.iec.edp.caf.commons.transaction.JpaTransaction;
import io.iec.edp.caf.commons.transaction.TransactionPropagation;
import io.iec.edp.caf.commons.utils.SpringBeanUtils;
import io.iec.edp.caf.datasource.CAFDataSourceSelector;
import io.iec.edp.caf.lock.service.api.api.DistributedLock;
import io.iec.edp.caf.lock.service.api.api.DistributedLockFactory;
import io.iec.edp.caf.msu.api.entity.ServiceUnitInfo;
import io.iec.edp.caf.msu.common.domain.entity.GspAppServerEntity;
import io.iec.edp.caf.msu.common.domain.entity.GspSuEntity;
import io.iec.edp.caf.msu.common.domain.repository.AppServerRepository;
import io.iec.edp.caf.msu.common.domain.repository.SuRepository;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * DB App实例的心跳
 *
 * @author manwenxing01
 * @date 2023-02-01 下午 14:00
 */
@Slf4j
public class DbHealthCheck {

    //App repo
    private AppServerRepository appRepo;

    //su repo
    private SuRepository suRepo;

    //instances
    private List<GspAppServerEntity> instances;
    private Map<String, List<ServiceUnitInfo>> maps;


    //DB健康检查的参数
    private DbHealthSetting setting;

    //任务
    private ScheduledExecutorService executorService;

    public DbHealthCheck(AppServerRepository appRepo, SuRepository suRepo, List<GspAppServerEntity> appEntities, Map<String, List<ServiceUnitInfo>> maps) {
        this.appRepo = appRepo;
        this.suRepo = suRepo;
        this.instances = appEntities;
        this.maps = maps;
        this.setting = SpringBeanUtils.getBean(DbHealthSetting.class);
        this.executorService = new ScheduledThreadPoolExecutor(1, r -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("DbHealthCheck");
            return thread;
        });
    }

    //检测
    public void start() {
        executorService.schedule(new DbBeatTask(), setting.getBeatPeriod(), TimeUnit.SECONDS);
    }

    //终止任务
    public void stop() {
        executorService.shutdown();
    }

    /**
     * 心跳上报 + 健康检测
     */
    class DbBeatTask implements Runnable {

        @Override
        public void run() {
            try {
                //设置主库
                CAFDataSourceSelector.selectMaster();

                JpaTransaction transaction = JpaTransaction.getTransaction();
                try {
                    transaction.begin(TransactionPropagation.REQUIRES_NEW);
                    //心跳上报 + 健康检测
                    doHealthyCheck();

                    transaction.commit();
                } catch (Throwable e) {
                    transaction.rollback();
                    if (log.isErrorEnabled()) {
                        log.error("HealthyCheck error {}", e.getMessage());
                        log.debug("Transaction exception", e);
                    }
                }
            } catch (Throwable e) {
                log.error("(DBHealthCheck) HealthyCheck error", e);
            } finally {
                //重置数据库信息
                CAFDataSourceSelector.reset();
                executorService.schedule(new DbBeatTask(), setting.getBeatPeriod(), TimeUnit.SECONDS);
            }
        }

        /**
         * instance心跳上报 + instances健康检测
         * 一、心跳上报
         * get取到数据：设置为健康状态；更新beatTime
         * get不到数据：新增此数据+新增su
         * 二、健康检测
         * beatTime == null 脏数据 删除
         * current - beatTime > RemovePeriod instance被清理
         */
        private void doHealthyCheck() {
            //数据库时间
            Date currentTimestamp = appRepo.getCurrentTimestamp();
            if (currentTimestamp == null) {
                currentTimestamp = new Date();
            } else {
                currentTimestamp.setTime(currentTimestamp.getTime());
            }
            //数据库时间转换为OffsetDateTime
            OffsetDateTime beatTime = currentTimestamp.toInstant().atZone(ZoneOffset.systemDefault()).toOffsetDateTime();
            //所有app实例
            List<GspAppServerEntity> allInstances = appRepo.findAll();

            //遍历实例内部的实例
            for (GspAppServerEntity inner : DbHealthCheck.this.instances) {
                String serviceName = inner.getAppName();

                //当前应用实例
                GspAppServerEntity appInstance = null;
                Optional<GspAppServerEntity> optional = allInstances.stream().filter(x -> serviceName.equals(x.getAppName())).findFirst();
                if (optional.isPresent()) {
                    appInstance = optional.get();
                }

                //心跳上报
                if (appInstance != null) {
                    //存在
                    appInstance.setBeatTime(beatTime);
                    //appServerEntity.setHealthy(true);     暂不使用
                    appRepo.save(appInstance);

                    if (log.isInfoEnabled()) log.info("[{}] beats", serviceName);

                    //清理allInstance中实例，减轻健康检查负担
                    allInstances.remove(appInstance);
                } else {
                    //清理su信息、实例信息
                    suRepo.deleteByApp(serviceName);       //此处是一重保险，避免非优雅停机导致数据未清理

                    //注册实例信息、su信息
                    //注册实例信息
                    inner.setBeatTime(beatTime);
                    //appEntity.setHealthy(true);   暂不使用
                    appRepo.save(inner);
                    //注册su信息
                    List<ServiceUnitInfo> suInfos = DbHealthCheck.this.maps.get(serviceName);
                    if (suInfos != null && suInfos.size() > 0) {
                        if (log.isInfoEnabled()) {
                            log.info("Start to register instance [{}]", serviceName);
                        }
                        for (ServiceUnitInfo su : suInfos) {
                            if (su != null) {
                                String suName = su.getName().toLowerCase();
                                if (suRepo.countByAppAndSu(serviceName, suName) == 0) {
                                    GspSuEntity suEntity = new GspSuEntity();
                                    suEntity.setId(UUID.randomUUID().toString());
                                    suEntity.setApp(serviceName);
                                    suEntity.setSu(suName);
                                    suRepo.save(suEntity);

                                    if (log.isInfoEnabled())
                                        log.info("Success to register su [{}]", suName);
                                }
                            }
                        }
                    }
                }
            }


            DistributedLockFactory lockFactory = SpringBeanUtils.getBean(DistributedLockFactory.class);
            DistributedLock lock = null;
            //加锁
            try {
                lock = lockFactory.tryCreateLock(Duration.ofMillis(100), "DB-Healthy-Check", Duration.ofSeconds(60));
                if (lock == null || lock.isAcquired()) {
                    log.info("Healthy Check starts");
                    //健康检测（检测其他app）
                    if (allInstances.size() > 0) {
                        long current = currentTimestamp.getTime() / 1000;   //变为秒
                        for (GspAppServerEntity entity : allInstances) {
                            OffsetDateTime dateTime = entity.getBeatTime();
                            if (dateTime == null) {
                                //脏数据
                                suRepo.deleteByApp(entity.getAppName());       //此处是一重保险，避免非优雅停机导致数据未清理
                                appRepo.deleteByAppName(entity.getAppName());
                            } else {
                                long beat = Date.from(dateTime.toInstant()).getTime() / 1000;
                                if (current - beat > setting.getRemovePeriod()) {
                                    suRepo.deleteByApp(entity.getAppName());       //此处是一重保险，避免非优雅停机导致数据未清理
                                    appRepo.deleteByAppName(entity.getAppName());

                                    if (log.isInfoEnabled()) {
                                        log.info("Remove [{}], URL [{}]", entity.getAppName(), entity.getAppUrl());
                                    }
                                    //暂不使用healthy字段
//                        } else if (current - beat > 15) {
//                            entity.setHealthy(false);
//                            appRepo.save(entity);
//                            log.info("(DBHealthCheck) {} is marked as unhealthy, URL {}", entity.getAppName(), entity.getAppUrl());
                                }
                            }
                        }
                    }
                }
            } catch (Exception e) {
                log.error("Healthy Check Error", e);
            } finally {
                //移除锁
                if (lock != null) {
                    try {
                        lock.close();
                    } catch (IOException e) {
                        log.error("Unlock failed", e);
                    }
                }
            }
        }
    }

}
